feat: implement DataWriter for Iceberg data files#552
feat: implement DataWriter for Iceberg data files#552shangxinli wants to merge 1 commit intoapache:mainfrom
Conversation
8944a75 to
a201953
Compare
src/iceberg/data/data_writer.cc
Outdated
|
|
||
| ICEBERG_ASSIGN_OR_RAISE(writer_, | ||
| WriterFactoryRegistry::Open(options_.format, writer_options)); | ||
| return {}; |
There was a problem hiding this comment.
It is odd that an empty structure is always returned. Also, since this is initialization why not doing in the ctor?
There was a problem hiding this comment.
Refactored the initialization logic
| if (closed_) { | ||
| return InvalidArgument("Writer already closed"); | ||
| } |
There was a problem hiding this comment.
I could see a case for making close idempotent, is there any strong reason why we want to return this error instead of no op for example?
| return InvalidArgument("Writer already closed"); | ||
| } | ||
| ICEBERG_RETURN_UNEXPECTED(writer_->Close()); | ||
| closed_ = true; |
There was a problem hiding this comment.
Should this class address thread safety?
There was a problem hiding this comment.
Good question! I've added explicit documentation that this class is not thread-safe:
There was a problem hiding this comment.
I don't think a single writer (or reader) should support thread safety so it is fine not to add comment like this.
src/iceberg/test/data_writer_test.cc
Outdated
| TEST_F(DataWriterTest, CreateWithParquetFormat) { | ||
| DataWriterOptions options{ | ||
| .path = "test_data.parquet", | ||
| .schema = schema_, | ||
| .spec = partition_spec_, | ||
| .partition = PartitionValues{}, | ||
| .format = FileFormatType::kParquet, | ||
| .io = file_io_, | ||
| .properties = {{"write.parquet.compression-codec", "uncompressed"}}, | ||
| }; | ||
|
|
||
| auto writer_result = DataWriter::Make(options); | ||
| ASSERT_THAT(writer_result, IsOk()); | ||
| auto writer = std::move(writer_result.value()); | ||
| ASSERT_NE(writer, nullptr); | ||
| } | ||
|
|
||
| TEST_F(DataWriterTest, CreateWithAvroFormat) { | ||
| DataWriterOptions options{ | ||
| .path = "test_data.avro", | ||
| .schema = schema_, | ||
| .spec = partition_spec_, | ||
| .partition = PartitionValues{}, | ||
| .format = FileFormatType::kAvro, | ||
| .io = file_io_, | ||
| }; | ||
|
|
||
| auto writer_result = DataWriter::Make(options); | ||
| ASSERT_THAT(writer_result, IsOk()); | ||
| auto writer = std::move(writer_result.value()); | ||
| ASSERT_NE(writer, nullptr); | ||
| } |
There was a problem hiding this comment.
nit: The two tests are quite similar, it is probably possible to leverage a function to reduce duplication
There was a problem hiding this comment.
Consolidated the two tests using parameterized testing.
| // Check length before close | ||
| auto length_result = writer->Length(); | ||
| ASSERT_THAT(length_result, IsOk()); | ||
| EXPECT_GT(length_result.value(), 0); |
There was a problem hiding this comment.
nit: check the size of the data passed to the write function?
src/iceberg/data/data_writer.cc
Outdated
| if (!writer_) { | ||
| return InvalidArgument("Writer not initialized"); | ||
| } |
There was a problem hiding this comment.
| if (!writer_) { | |
| return InvalidArgument("Writer not initialized"); | |
| } | |
| ICEBERG_PRECHECK(writer_, "Writer not initialized"); |
nit, this should make the code shorter.
There was a problem hiding this comment.
Replaced all manual null checks with ICEBERG_PRECHECK
src/iceberg/data/data_writer.cc
Outdated
| } | ||
|
|
||
| Result<FileWriter::WriteResult> Metadata() { | ||
| if (!closed_) { |
There was a problem hiding this comment.
nit: use ICEBERG_CHECK here
src/iceberg/test/data_writer_test.cc
Outdated
| EXPECT_GT(length.value(), 0); | ||
| } | ||
|
|
||
| } // namespace |
There was a problem hiding this comment.
nit: move this closing namespace curly before the first TEST_F?
90d324e to
153d763
Compare
Implements DataWriter class for writing Iceberg data files as part of issue apache#441 (task 2). Implementation: - Static factory method DataWriter::Make() for creating writer instances - Support for Parquet and Avro file formats via WriterFactoryRegistry - Complete DataFile metadata generation including partition info, column statistics, serialized bounds, and sort order ID - Proper lifecycle management with Write/Close/Metadata methods - Idempotent Close() - multiple calls succeed (no-op after first) - PIMPL idiom for ABI stability - Not thread-safe (documented) Tests: - 13 comprehensive unit tests including parameterized format tests - Coverage: creation, write/close lifecycle, metadata generation, error handling, feature validation, and data size verification - All tests passing (13/13) Related to apache#441
153d763 to
147f25b
Compare
| class DataWriter::Impl { | ||
| public: | ||
| static Result<std::unique_ptr<Impl>> Make(DataWriterOptions options) { | ||
| WriterOptions writer_options; |
There was a problem hiding this comment.
nit: use aggregate initialization for writer_options
| } | ||
|
|
||
| Status Write(ArrowArray* data) { | ||
| ICEBERG_PRECHECK(writer_, "Writer not initialized"); |
There was a problem hiding this comment.
Will this check ever fail? If not, should we remove the check or use ICEBERG_DCHECK instead? Same question for below.
| return InvalidArgument("Writer already closed"); | ||
| } | ||
| ICEBERG_RETURN_UNEXPECTED(writer_->Close()); | ||
| closed_ = true; |
There was a problem hiding this comment.
I don't think a single writer (or reader) should support thread safety so it is fine not to add comment like this.
| } | ||
|
|
||
| Result<FileWriter::WriteResult> Metadata() { | ||
| ICEBERG_PRECHECK(closed_, "Cannot get metadata before closing the writer"); |
There was a problem hiding this comment.
| ICEBERG_PRECHECK(closed_, "Cannot get metadata before closing the writer"); | |
| ICEBERG_CHECK(closed_, "Cannot get metadata before closing the writer"); |
We should return invalid state instead of invalid argument in this case.
| data_file->file_path = options_.path; | ||
| data_file->file_format = options_.format; | ||
| data_file->partition = options_.partition; | ||
| data_file->record_count = metrics.row_count.value_or(0); |
There was a problem hiding this comment.
| data_file->record_count = metrics.row_count.value_or(0); | |
| data_file->record_count = metrics.row_count.value_or(-1); |
Java impl uses -1 when row count is unavailable.
| auto split_offsets = writer_->split_offsets(); | ||
|
|
||
| auto data_file = std::make_shared<DataFile>(); | ||
| data_file->content = DataFile::Content::kData; |
There was a problem hiding this comment.
nit: use aggregate initialization
|
|
||
| // Convert metrics maps from unordered_map to map | ||
| for (const auto& [col_id, size] : metrics.column_sizes) { | ||
| data_file->column_sizes[col_id] = size; |
There was a problem hiding this comment.
Do you think it makes sense to change DataFile and Metrics classes to use std::map or std::unordered_map consistently so we don't need to use a for-loop here?
cc @zhjwpku
| SchemaField::MakeRequired(1, "id", std::make_shared<IntType>()), | ||
| SchemaField::MakeOptional(2, "name", std::make_shared<StringType>())}); |
There was a problem hiding this comment.
| SchemaField::MakeRequired(1, "id", std::make_shared<IntType>()), | |
| SchemaField::MakeOptional(2, "name", std::make_shared<StringType>())}); | |
| SchemaField::MakeRequired(1, "id", int32()), | |
| SchemaField::MakeOptional(2, "name", string())}); |
|
|
||
| using ::testing::HasSubstr; | ||
|
|
||
| class DataWriterTest : public ::testing::Test { |
There was a problem hiding this comment.
Can we try to consolidate the test cases since each of them only test a tiny api with repeated boilerplate of creating writer and writing data? This may lead to test cases explosion if more and more cases are like this.
Implements DataWriter class for writing Iceberg data files as part of issue #441 (task 2).
Implementation:
Related to #441